Parse the GRC into something easier to work with

First, let's get the interesting stuff out of the GRC file. There's a bit of trial-and-error here as we figure out what's what in the file.

In [44]:
from xml.dom import minidom
xmldoc = minidom.parse('/home/jbm/src/gnuradio/gr-uhd/examples/grc/uhd_dpsk_mod.grc')

import xml.etree.ElementTree as ET  # for generation

from IPython.display import SVG, display

import math # because everyone needs a little trig in their life
In [45]:
n = xmldoc.childNodes[0]
In [46]:
n = xmldoc.getElementsByTagName("flow_graph")[0]
In [47]:
n.getElementsByTagName("block")[0]
m = n.getElementsByTagName("key")[0]
l = m.childNodes[0]
l.data
Out[47]:
u'options'
In [48]:
math.atan2(7.0, -94.0)
Out[48]:
3.0672617664221278
In [49]:
class GRCVBlock:
    def __init__(self, el):
        self.params = {}
        
        self.key = el.getElementsByTagName("key")[0].childNodes[0].data
        
        for p in el.getElementsByTagName("param"):
            k = p.getElementsByTagName("key")[0].childNodes[0].data
            try:
                v = p.getElementsByTagName("value")[0].childNodes[0].data
            except IndexError:
                v = ""
            
            self.params[k] = v
            
        self.connections = [] # List of (direction, my_port, remote_id, remote_port)
        
        self.port_coords = {}
        
    def get_id(self):
        return self.params["id"]
    
    def connect(self, direction, my_port, remote_id, remote_port):
        candidate_row = (direction, my_port, remote_id, remote_port)
        for r in self.connections:
            if r == candidate_row:
                return
            
        self.connections.append(candidate_row)
        
    def ports(self, direction):
        retval = set()
        
        for d, my_port, remote_id, remote_port in self.connections:
            if d == direction:
                retval.add(my_port)
                
        return list(retval)
    
    def coordinates(self):
        "Extract the x,y and rotation for this block"
        raw_xy = self.params["_coordinate"]
        x,y = map(float, raw_xy[1:-1].split(","))
        theta = float(self.params["_rotation"])
        
        return (x,y,theta)
    
    def coordinates_port(self, direction, port):
        return self.port_coords["%s,%s" % (direction,port)]

    def enabled(self):
        return self.params["_enabled"]
    
    def attach_svg(self, parent):
        (x,y,t) = self.coordinates()
        
        t_rad = t*math.pi/180.0
        
        header_text = self.get_id()
        
        if self.key == "note":
            header_text = "Note: %s" % self.params["note"]
        
        ports_in = self.ports("in")
        ports_out = self.ports("out")
        
        max_ports = max(map(len, [ports_in, ports_out]))
        
        PORT_WIDTH = 10
        PORT_HEIGHT = 20
        PORT_GAP = 2
        
        HEADER_HEIGHT = 22
        FOOTER_HEIGHT = HEADER_HEIGHT/2
        
        h = HEADER_HEIGHT + (PORT_HEIGHT+PORT_GAP)*max_ports + FOOTER_HEIGHT
        w = 10 + 7 * len(header_text)
        
        x_c = x+w/2
        y_c = y+h/2
        
        palette = { 
            "options": ("grey", 0.1),
            "variable": ("#8888aa", 0.2),
            "variable_slider": ("#8888aa", 0.3),
            "import": ("#aa8888", 0.3),
            "notebook": ("#888888", 0.5),
            "note": ("#ff4444", 0.4),
            
            "_": ("#aa8888", 0.2)
        }
        
        palette_entry = palette["_"]
        if self.key in palette:
            palette_entry = palette[self.key]
        
        blk = ET.SubElement(parent,
                            "g")

                
        blk.set("class","grc_block")

        
        if abs(t) > 0.0001:
            blk.set("transform", "rotate(%f %f %f)" % (t, x_c, y_c))
        
        #################
        # Base rectangle
        rect = ET.SubElement(blk,
                      "rect",
                      x=str(x),
                      y=str(y),
                      height=str(h),
                      width=str(w),
                      style="fill: %s; opacity: %f;" % palette_entry)
        rect.set("class", "key_%s" % self.key)
        
        #################
        # Header text
        txt = ET.SubElement(blk,
                      "text",
                      x=str(x+15),
                      y=str(y+HEADER_HEIGHT-5),
                      height=str(h),
                      width=str(w),
                      style="text-weight: bold;")
        
        txt.text = header_text
        
        #############
        # I/O Ports
        ports_in.sort()
        for i,p in enumerate(ports_in):
            y_p = y + HEADER_HEIGHT + (i+1)*(PORT_GAP) + i*PORT_HEIGHT
            x_p = x - PORT_WIDTH
            
            rect_p = ET.SubElement(blk,
                                   "rect",
                                   x=str(x_p),
                                   y=str(y_p),
                                   width=str(PORT_WIDTH),
                                   height=str(PORT_HEIGHT),
                                   style="fill:grey; opacity: 0.5;")
            
            x_logical = x_p+PORT_WIDTH/2
            x_rel = x_logical-x_c
            
            y_logical = y_p+PORT_HEIGHT/2
            y_rel = y_logical - y_c

            l = math.sqrt(x_rel*x_rel + y_rel*y_rel)
            
            t0 = math.atan2(y_rel, x_rel)
            t_act = t0+t_rad
            
            x_proj = x_c + l * math.cos(t_act)
            y_proj = y_c + l * math.sin(t_act)
            
            self.port_coords[ "in,%s" % p ] = (x_proj,y_proj)
            # ET.SubElement(parent, "text", x=str(x_proj),y=str(y_proj)).text="i"

            

        ports_out.sort()
        for i,p in enumerate(ports_out):
            y_p = y + HEADER_HEIGHT + (i+1)*(PORT_GAP) + i*PORT_HEIGHT
            x_p = x + w
            
            rect_p = ET.SubElement(blk,
                                   "rect",
                                   x=str(x_p),
                                   y=str(y_p),
                                   width=str(PORT_WIDTH),
                                   height=str(PORT_HEIGHT),
                                   style="fill:grey; opacity: 0.5;")
            
            x_logical = x_p+PORT_WIDTH/2
            x_rel = x_logical-x_c
            
            y_logical = y_p+PORT_HEIGHT/2
            y_rel = y_logical - y_c

            l = math.sqrt(x_rel*x_rel + y_rel*y_rel)
            t0 = math.atan2(y_rel, x_rel)
            t_act = t0+t_rad
                    
            x_proj = x_c + l * math.cos(t_act)
            y_proj = y_c + l * math.sin(t_act)

            # ET.SubElement(parent, "text", x=str(x_proj),y=str(y_proj)).text="o"
            
            self.port_coords[ "out,%s" % p ] = (x_proj,y_proj)

all_blocks = map(GRCVBlock, n.getElementsByTagName("block"))

try:
    # http://nbviewer.ipython.org/gist/rpmuller/5666810
    from IPython.display import SVG
    import xml.etree.ElementTree as ET

    #fg = GRCVFlowGraph("/home/jbm/src/gnuradio-grc-examples/receiver/apt_play.grc")
    fg = GRCVFlowGraph("rds_rx_janked.grc")
    display(SVG(fg.svg()))
except Exception, e:
    print str(e)
    print "You're probably fine; this is premature to run this. It's here to make tweaking easier."
digital_mpsk_receiver_cc_0blocks_complex_to_real_0audio_decimaudio_decim_rategr_rds_panel_0blocks_keep_one_in_n_1blocks_keep_one_in_n_0baseband_ratenbfreq_offsetroot_raised_cosine_filter_0digital_diff_decoder_bb_0freq_xlating_fir_filter_xxx_0freq_xlating_fir_filter_xxx_1analog_quadrature_demod_cf_0samp_ratebb_decimvolumegr_rds_parser_0gainfreq_tunerds_rxfreqimport_0xlate_bandwidthaudio_rateanalog_wfm_rcv_0uhd_usrp_source_0gr_rds_decoder_0digital_binary_slicer_fb_0
In [50]:
[ (b.key, b.get_id(), b.coordinates() ) for b in all_blocks ]
Out[50]:
[(u'options', u'uhd_dpsk_mod', (-5.0, -2.0, 0.0)),
 (u'variable', u'samps_per_sym', (743.0, 124.0, 0.0)),
 (u'variable', u'rolloff', (566.0, 548.0, 0.0)),
 (u'variable', u'nfilts', (653.0, 548.0, 0.0)),
 (u'parameter', u'tx_gain', (968.0, 0.0, 0.0)),
 (u'parameter', u'freq', (634.0, 0.0, 0.0)),
 (u'parameter', u'samp_rate', (508.0, 0.0, 0.0)),
 (u'parameter', u'address0', (162.0, 0.0, 0.0)),
 (u'parameter', u'freq_offset', (789.0, 0.0, 0.0)),
 (u'parameter', u'rx_gain', (1102.0, 0.0, 0.0)),
 (u'import', u'import_0', (169.0, 93.0, 0.0)),
 (u'blocks_multiply_const_vxx',
  u'blocks_multiply_const_vxx_0',
  (396.0, 161.0, 0.0)),
 (u'variable_qtgui_range', u'ampl', (916.0, 115.0, 0.0)),
 (u'parameter', u'address1', (322.0, 0.0, 0.0)),
 (u'uhd_usrp_sink', u'uhd_usrp_sink_0', (576.0, 131.0, 0.0)),
 (u'qtgui_const_sink_x', u'qtgui_const_sink_x_0', (1.0, 367.0, 180.0)),
 (u'uhd_usrp_source', u'uhd_usrp_source_0', (-1.0, 242.0, 0.0)),
 (u'digital_dxpsk_mod', u'digital_dxpsk_mod_0', (204.0, 138.0, 0.0)),
 (u'blocks_vector_source_x', u'blocks_vector_source_x_0', (6.0, 142.0, 0.0)),
 (u'qtgui_time_sink_x', u'qtgui_time_sink_x_0', (294.0, 265.0, 0.0)),
 (u'variable_qtgui_range', u'tun_freq', (5.0, 545.0, 0.0)),
 (u'variable_qtgui_range', u'tun_rx_gain', (391.0, 546.0, 0.0)),
 (u'variable_qtgui_range', u'tun_tx_gain', (277.0, 546.0, 0.0)),
 (u'variable_qtgui_range', u'rx_freq_off', (130.0, 547.0, 0.0)),
 (u'digital_dxpsk_demod', u'digital_dxpsk_demod_0', (294.0, 375.0, 0.0)),
 (u'blocks_unpacked_to_packed_xx',
  u'blocks_unpacked_to_packed_xx_0',
  (542.0, 420.0, 0.0)),
 (u'blocks_uchar_to_float', u'blocks_uchar_to_float_0', (747.0, 431.0, 0.0)),
 (u'qtgui_time_sink_x', u'qtgui_time_sink_x_1', (923.0, 405.0, 0.0)),
 (u'qtgui_freq_sink_x', u'qtgui_freq_sink_x_0', (610.0, 254.0, 0.0))]
In [51]:
from collections import defaultdict

paramcounts = defaultdict(int)

for b in all_blocks:
    for k in b.params:
        paramcounts[k] += 1
        
all_params = paramcounts.keys()
all_params.sort(key=lambda s: -1*paramcounts[s])

for p in all_params[0:7]:
    print "%s  === > %d"   % (p, paramcounts[p])
_rotation  === > 29
_coordinate  === > 29
id  === > 29
_enabled  === > 29
alias  === > 29
type  === > 18
value  === > 15

In [52]:
class GRCVConnection:
    def __init__(self, el):
        def gv(x):
            try:
                return el.getElementsByTagName(x)[0].childNodes[0].data
            except IndexError:
                return None

        self.source = (gv("source_block_id"), gv("source_key"))
        self.sink = (gv("sink_block_id"), gv("sink_key"))
        

all_conns = map(GRCVConnection, n.getElementsByTagName("connection"))

len(all_conns)
Out[52]:
10
In [53]:
len("xlate_bandwidth")
Out[53]:
15
In [54]:
[ (c.source, c.sink) for c in all_conns ]
Out[54]:
[((u'uhd_usrp_source_0', u'0'), (u'qtgui_const_sink_x_0', u'0')),
 ((u'uhd_usrp_source_0', u'0'), (u'qtgui_freq_sink_x_0', u'0')),
 ((u'digital_dxpsk_mod_0', u'0'), (u'blocks_multiply_const_vxx_0', u'0')),
 ((u'blocks_multiply_const_vxx_0', u'0'), (u'uhd_usrp_sink_0', u'0')),
 ((u'uhd_usrp_source_0', u'0'), (u'digital_dxpsk_demod_0', u'0')),
 ((u'uhd_usrp_source_0', u'0'), (u'qtgui_time_sink_x_0', u'0')),
 ((u'blocks_vector_source_x_0', u'0'), (u'digital_dxpsk_mod_0', u'0')),
 ((u'digital_dxpsk_demod_0', u'0'), (u'blocks_unpacked_to_packed_xx_0', u'0')),
 ((u'blocks_unpacked_to_packed_xx_0', u'0'),
  (u'blocks_uchar_to_float_0', u'0')),
 ((u'blocks_uchar_to_float_0', u'0'), (u'qtgui_time_sink_x_1', u'0'))]
In [65]:
import xml.etree.ElementTree as ET
from xml.dom import minidom

class GRCVFlowGraph:
    def __init__(self, path):
        xmldoc = minidom.parse(path)
        fg = xmldoc.getElementsByTagName("flow_graph")[0]
        # XXX TODO Skip the timestamp
        
        self.blocks = {}
        
        
        for el in fg.getElementsByTagName("block"):
            b = GRCVBlock(el)
            
            self.blocks[b.get_id()] = b
        
        self.connections = map(GRCVConnection, 
                               fg.getElementsByTagName("connection"))
        
        for c in self.connections:
            (src_id, src_port) = c.source
            (snk_id, snk_port) = c.sink
            
            self.blocks[src_id].connect("out", src_port, snk_id, snk_port)
            self.blocks[snk_id].connect("in", snk_port, src_id, src_port)
        
    def bounds(self):        
        coords = [ b.coordinates() for b in self.blocks.values() ]
        
        xmin = min([c[0] for c in coords])
        xmax = max([c[0] for c in coords])
        ymin = min([c[1] for c in coords])
        ymax = max([c[1] for c in coords])
        
        return ( (xmin,ymin), (xmax,ymax) )
    
    def svg(self):
        bbox = self.bounds()

        # Just take the maximum plus fudge factor for now
        (width,height) = bbox[1]
        width *= 1.20
        height *= 1.20

        
        svg = ET.Element('svg', 
                         xmlns="http://www.w3.org/2000/svg",
                         version="1.1",
                         height="%f" % height,
                         width="%f" % width)
 
        
        defs = ET.SubElement(svg, "defs")
        arrow = ET.SubElement(defs, "marker")
        toset = { 
                    "id": "markerArrow", 
                    "markerWidth": "9",
                    "markerHeight": "9",
                    "refx": "2",
                    "refy": "6",
                    "orient": "auto"
                }
        
        for k,v in toset.iteritems():
            arrow.set(k,v)
        
        ET.SubElement(arrow, "path", d="M2,2 L2,9 L9,6 L2,2", style="fill: black;")
        
        canvas = ET.SubElement(svg, 'g', transform="translate(20 20)")
        
        for b in self.blocks.values():
            b.attach_svg(canvas)
            
            
        for c in self.connections:
            (src_id, src_port) = c.source
            (snk_id, snk_port) = c.sink

            x0,y0 = self.blocks[src_id].coordinates_port("out",src_port)
            x1,y1 = self.blocks[snk_id].coordinates_port("in",snk_port)
            
            line = ET.SubElement(canvas, 
                                 "line",
                                 x1=str(x0),
                                 y1=str(y0),
                                 x2=str(x1),
                                 y2=str(y1),
                                 stroke="#333333",
                                 style="marker-end: url(#markerArrow);")
            line.set("stroke-width", "1.5")
            
        return ET.tostring(svg)
        
In [66]:
fg = GRCVFlowGraph("rds_rx_janked.grc")
fg.bounds()
fg.svg()[0:100]
Out[66]:
'<svg height="1152.000000" version="1.1" width="1315.200000" xmlns="http://www.w3.org/2000/svg"><defs'

Now, how to display an SVG in-line?

In [67]:
# http://nbviewer.ipython.org/gist/rpmuller/5666810
from IPython.display import SVG
import xml.etree.ElementTree as ET

fg = GRCVFlowGraph("rds_rx_janked.grc")

SVG(fg.svg())
Out[67]:
digital_mpsk_receiver_cc_0blocks_complex_to_real_0audio_decimaudio_decim_rategr_rds_panel_0blocks_keep_one_in_n_1blocks_keep_one_in_n_0baseband_ratenbfreq_offsetroot_raised_cosine_filter_0digital_diff_decoder_bb_0freq_xlating_fir_filter_xxx_0freq_xlating_fir_filter_xxx_1analog_quadrature_demod_cf_0samp_ratebb_decimvolumegr_rds_parser_0gainfreq_tunerds_rxfreqimport_0xlate_bandwidthaudio_rateanalog_wfm_rcv_0uhd_usrp_source_0gr_rds_decoder_0digital_binary_slicer_fb_0
In [68]:
fg = GRCVFlowGraph("/home/jbm/src/misc/gr-rds/apps/rds_rx.grc")
SVG(fg.svg())
Out[68]:
rtlsdr_source_0digital_mpsk_receiver_cc_0blocks_complex_to_real_0freq_tuneuhd_usrp_source_0audio_decim_rategr_rds_panel_0wxgui_fftsink2_0_0_0blocks_keep_one_in_n_1blocks_keep_one_in_n_0fir_filter_xxx_3wxgui_waterfallsink2_0wxgui_fftsink2_0_0Note: PilotNote: L-Rbaseband_ratefir_filter_xxx_5nbfreq_offsetfir_filter_xxx_1root_raised_cosine_filter_0blocks_multiply_xx_0import_0digital_diff_decoder_bb_0wxgui_scopesink2_0freq_xlating_fir_filter_xxx_0audio_decimanalog_quadrature_demod_cf_0Note: L+Raudio_sink_0samp_ratebb_decimwxgui_fftsink2_0gr_rds_decoder_0analog_fm_deemph_0_0gr_rds_parser_0blocks_add_xx_0blocks_multiply_const_vxx_0rational_resampler_xxx_0_0wxgui_fftsink2_0_0_0_1_0_1wxgui_fftsink2_0_0_0_1_0_0rds_rxfreqgainanalog_fm_deemph_0_0_0volumeblocks_multiply_xx_1Note: RDSfir_filter_xxx_2wxgui_scopesink2_1audio_ratefreq_xlating_fir_filter_xxx_1blocks_sub_xx_0wxgui_fftsink2_0_0_0_1analog_wfm_rcv_0rational_resampler_xxx_0xlate_bandwidthdigital_binary_slicer_fb_0blocks_multiply_const_vxx_0_0analog_pll_refout_cc_0blocks_complex_to_imag_0
In []: